// Copyright 2019 Parity Technologies (UK) Ltd.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the "Software"),
// to deal in the Software without restriction, including without limitation
// the rights to use, copy, modify, merge, publish, distribute, sublicense,
// and/or sell copies of the Software, and to permit persons to whom the
// Software is furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! High level manager of the network.
//!
//! A [`Swarm`] contains the state of the network as a whole. The entire
//! behaviour of a libp2p network can be controlled through the `Swarm`.
//! The `Swarm` struct contains all active and pending connections to
//! remotes and manages the state of all the substreams that have been
//! opened, and all the upgrades that were built upon these substreams.
//!
//! # Initializing a Swarm
//!
//! Creating a `Swarm` requires three things:
//!
//!  1. A network identity of the local node in form of a [`PeerId`].
//!  2. An implementation of the [`Transport`] trait. This is the type that
//!     will be used in order to reach nodes on the network based on their
//!     address. See the `transport` module for more information.
//!  3. An implementation of the [`NetworkBehaviour`] trait. This is a state
//!     machine that defines how the swarm should behave once it is connected
//!     to a node.
//!
//! # Network Behaviour
//!
//! The [`NetworkBehaviour`] trait is implemented on types that indicate to
//! the swarm how it should behave. This includes which protocols are supported
//! and which nodes to try to connect to. It is the `NetworkBehaviour` that
//! controls what happens on the network. Multiple types that implement
//! `NetworkBehaviour` can be composed into a single behaviour.
//!
//! # Protocols Handler
//!
//! The [`ConnectionHandler`] trait defines how each active connection to a
//! remote should behave: how to handle incoming substreams, which protocols
//! are supported, when to open a new outbound substream, etc.
//!

mod connection;
mod registry;
#[cfg(test)]
mod test;
mod upgrade;

pub mod behaviour;
pub mod dial_opts;
pub mod handler;

pub use behaviour::{
    CloseConnection, NetworkBehaviour, NetworkBehaviourAction, NetworkBehaviourEventProcess,
    NotifyHandler, PollParameters,
};
pub use connection::{
    ConnectionCounters, ConnectionError, ConnectionLimit, ConnectionLimits, PendingConnectionError,
    PendingInboundConnectionError, PendingOutboundConnectionError,
};
pub use handler::{
    ConnectionHandler, ConnectionHandlerEvent, ConnectionHandlerSelect, ConnectionHandlerUpgrErr,
    IntoConnectionHandler, IntoConnectionHandlerSelect, KeepAlive, OneShotHandler,
    OneShotHandlerConfig, SubstreamProtocol,
};
pub use registry::{AddAddressResult, AddressRecord, AddressScore};

use connection::pool::{Pool, PoolConfig, PoolEvent};
use connection::{EstablishedConnection, IncomingInfo, ListenersEvent, ListenersStream, Substream};
use dial_opts::{DialOpts, PeerCondition};
use either::Either;
use futures::{executor::ThreadPoolBuilder, prelude::*, stream::FusedStream};
use libp2p_core::connection::{ConnectionId, PendingPoint};
use libp2p_core::{
    connection::{ConnectedPoint, ListenerId},
    multiaddr::Protocol,
    multihash::Multihash,
    muxing::StreamMuxerBox,
    transport::{self, TransportError},
    upgrade::ProtocolName,
    Endpoint, Executor, Multiaddr, Negotiated, PeerId, Transport,
};
use registry::{AddressIntoIter, Addresses};
use smallvec::SmallVec;
use std::collections::HashSet;
use std::iter;
use std::num::{NonZeroU32, NonZeroU8, NonZeroUsize};
use std::{
    convert::TryFrom,
    error, fmt, io,
    pin::Pin,
    task::{Context, Poll},
};
use upgrade::UpgradeInfoSend as _;

/// Substream for which a protocol has been chosen.
///
/// Implements the [`AsyncRead`](futures::io::AsyncRead) and
/// [`AsyncWrite`](futures::io::AsyncWrite) traits.
pub type NegotiatedSubstream = Negotiated<Substream<StreamMuxerBox>>;

/// Event generated by the [`NetworkBehaviour`] that the swarm will report back.
type TBehaviourOutEvent<TBehaviour> = <TBehaviour as NetworkBehaviour>::OutEvent;

/// [`ConnectionHandler`] of the [`NetworkBehaviour`] for all the protocols the [`NetworkBehaviour`]
/// supports.
type THandler<TBehaviour> = <TBehaviour as NetworkBehaviour>::ConnectionHandler;

/// Custom event that can be received by the [`ConnectionHandler`] of the
/// [`NetworkBehaviour`].
type THandlerInEvent<TBehaviour> =
    <<THandler<TBehaviour> as IntoConnectionHandler>::Handler as ConnectionHandler>::InEvent;

/// Custom event that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
type THandlerOutEvent<TBehaviour> =
    <<THandler<TBehaviour> as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent;

/// Custom error that can be produced by the [`ConnectionHandler`] of the [`NetworkBehaviour`].
type THandlerErr<TBehaviour> =
    <<THandler<TBehaviour> as IntoConnectionHandler>::Handler as ConnectionHandler>::Error;

/// Event generated by the `Swarm`.
#[derive(Debug)]
pub enum SwarmEvent<TBehaviourOutEvent, THandlerErr> {
    /// Event generated by the `NetworkBehaviour`.
    Behaviour(TBehaviourOutEvent),
    /// A connection to the given peer has been opened.
    ConnectionEstablished {
        /// Identity of the peer that we have connected to.
        peer_id: PeerId,
        /// Endpoint of the connection that has been opened.
        endpoint: ConnectedPoint,
        /// Number of established connections to this peer, including the one that has just been
        /// opened.
        num_established: NonZeroU32,
        /// [`Some`] when the new connection is an outgoing connection.
        /// Addresses are dialed concurrently. Contains the addresses and errors
        /// of dial attempts that failed before the one successful dial.
        concurrent_dial_errors: Option<Vec<(Multiaddr, TransportError<io::Error>)>>,
    },
    /// A connection with the given peer has been closed,
    /// possibly as a result of an error.
    ConnectionClosed {
        /// Identity of the peer that we have connected to.
        peer_id: PeerId,
        /// Endpoint of the connection that has been closed.
        endpoint: ConnectedPoint,
        /// Number of other remaining connections to this same peer.
        num_established: u32,
        /// Reason for the disconnection, if it was not a successful
        /// active close.
        cause: Option<ConnectionError<THandlerErr>>,
    },
    /// A new connection arrived on a listener and is in the process of protocol negotiation.
    ///
    /// A corresponding [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished),
    /// [`BannedPeer`](SwarmEvent::BannedPeer), or
    /// [`IncomingConnectionError`](SwarmEvent::IncomingConnectionError) event will later be
    /// generated for this connection.
    IncomingConnection {
        /// Local connection address.
        /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
        /// event.
        local_addr: Multiaddr,
        /// Address used to send back data to the remote.
        send_back_addr: Multiaddr,
    },
    /// An error happened on a connection during its initial handshake.
    ///
    /// This can include, for example, an error during the handshake of the encryption layer, or
    /// the connection unexpectedly closed.
    IncomingConnectionError {
        /// Local connection address.
        /// This address has been earlier reported with a [`NewListenAddr`](SwarmEvent::NewListenAddr)
        /// event.
        local_addr: Multiaddr,
        /// Address used to send back data to the remote.
        send_back_addr: Multiaddr,
        /// The error that happened.
        error: PendingInboundConnectionError<io::Error>,
    },
    /// Outgoing connection attempt failed.
    OutgoingConnectionError {
        /// If known, [`PeerId`] of the peer we tried to reach.
        peer_id: Option<PeerId>,
        /// Error that has been encountered.
        error: DialError,
    },
    /// We connected to a peer, but we immediately closed the connection because that peer is banned.
    BannedPeer {
        /// Identity of the banned peer.
        peer_id: PeerId,
        /// Endpoint of the connection that has been closed.
        endpoint: ConnectedPoint,
    },
    /// One of our listeners has reported a new local listening address.
    NewListenAddr {
        /// The listener that is listening on the new address.
        listener_id: ListenerId,
        /// The new address that is being listened on.
        address: Multiaddr,
    },
    /// One of our listeners has reported the expiration of a listening address.
    ExpiredListenAddr {
        /// The listener that is no longer listening on the address.
        listener_id: ListenerId,
        /// The expired address.
        address: Multiaddr,
    },
    /// One of the listeners gracefully closed.
    ListenerClosed {
        /// The listener that closed.
        listener_id: ListenerId,
        /// The addresses that the listener was listening on. These addresses are now considered
        /// expired, similar to if a [`ExpiredListenAddr`](SwarmEvent::ExpiredListenAddr) event
        /// has been generated for each of them.
        addresses: Vec<Multiaddr>,
        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
        /// if the stream produced an error.
        reason: Result<(), io::Error>,
    },
    /// One of the listeners reported a non-fatal error.
    ListenerError {
        /// The listener that errored.
        listener_id: ListenerId,
        /// The listener error.
        error: io::Error,
    },
    /// A new dialing attempt has been initiated by the [`NetworkBehaviour`]
    /// implementation.
    ///
    /// A [`ConnectionEstablished`](SwarmEvent::ConnectionEstablished) event is
    /// reported if the dialing attempt succeeds, otherwise a
    /// [`OutgoingConnectionError`](SwarmEvent::OutgoingConnectionError) event
    /// is reported.
    Dialing(PeerId),
}

/// Contains the state of the network, plus the way it should behave.
///
/// Note: Needs to be polled via `<Swarm as Stream>` in order to make
/// progress.
pub struct Swarm<TBehaviour>
where
    TBehaviour: NetworkBehaviour,
{
    /// Listeners for incoming connections.
    listeners: ListenersStream<transport::Boxed<(PeerId, StreamMuxerBox)>>,

    /// The nodes currently active.
    pool: Pool<THandler<TBehaviour>, transport::Boxed<(PeerId, StreamMuxerBox)>>,

    /// The local peer ID.
    local_peer_id: PeerId,

    /// Handles which nodes to connect to and how to handle the events sent back by the protocol
    /// handlers.
    behaviour: TBehaviour,

    /// List of protocols that the behaviour says it supports.
    supported_protocols: SmallVec<[Vec<u8>; 16]>,

    /// List of multiaddresses we're listening on.
    listened_addrs: SmallVec<[Multiaddr; 8]>,

    /// List of multiaddresses we're listening on, after account for external IP addresses and
    /// similar mechanisms.
    external_addrs: Addresses,

    /// List of nodes for which we deny any incoming connection.
    banned_peers: HashSet<PeerId>,

    /// Connections for which we withhold any reporting. These belong to banned peers.
    ///
    /// Note: Connections to a peer that are established at the time of banning that peer
    /// are not added here. Instead they are simply closed.
    banned_peer_connections: HashSet<ConnectionId>,

    /// Pending event to be delivered to connection handlers
    /// (or dropped if the peer disconnected) before the `behaviour`
    /// can be polled again.
    pending_event: Option<(PeerId, PendingNotifyHandler, THandlerInEvent<TBehaviour>)>,
}

impl<TBehaviour> Unpin for Swarm<TBehaviour> where TBehaviour: NetworkBehaviour {}

impl<TBehaviour> Swarm<TBehaviour>
where
    TBehaviour: NetworkBehaviour,
{
    /// Builds a new `Swarm`.
    pub fn new(
        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
        behaviour: TBehaviour,
        local_peer_id: PeerId,
    ) -> Self {
        SwarmBuilder::new(transport, behaviour, local_peer_id).build()
    }

    /// Returns information about the connections underlying the [`Swarm`].
    pub fn network_info(&self) -> NetworkInfo {
        let num_peers = self.pool.num_peers();
        let connection_counters = self.pool.counters().clone();
        NetworkInfo {
            num_peers,
            connection_counters,
        }
    }

    /// Starts listening on the given address.
    /// Returns an error if the address is not supported.
    ///
    /// Listeners report their new listening addresses as [`SwarmEvent::NewListenAddr`].
    /// Depending on the underlying transport, one listener may have multiple listening addresses.
    pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<io::Error>> {
        let id = self.listeners.listen_on(addr)?;
        self.behaviour.inject_new_listener(id);
        Ok(id)
    }

    /// Remove some listener.
    ///
    /// Returns `true` if there was a listener with this ID, `false`
    /// otherwise.
    pub fn remove_listener(&mut self, id: ListenerId) -> bool {
        self.listeners.remove_listener(id)
    }

    /// Dial a known or unknown peer.
    ///
    /// See also [`DialOpts`].
    ///
    /// ```
    /// # use libp2p_swarm::Swarm;
    /// # use libp2p_swarm::dial_opts::{DialOpts, PeerCondition};
    /// # use libp2p_core::{Multiaddr, PeerId, Transport};
    /// # use libp2p_core::transport::dummy::DummyTransport;
    /// # use libp2p_swarm::DummyBehaviour;
    /// #
    /// let mut swarm = Swarm::new(
    ///   DummyTransport::new().boxed(),
    ///   DummyBehaviour::default(),
    ///   PeerId::random(),
    /// );
    ///
    /// // Dial a known peer.
    /// swarm.dial(PeerId::random());
    ///
    /// // Dial an unknown peer.
    /// swarm.dial("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap());
    /// ```
    pub fn dial(&mut self, opts: impl Into<DialOpts>) -> Result<(), DialError> {
        let handler = self.behaviour.new_handler();
        self.dial_with_handler(opts.into(), handler)
    }

    fn dial_with_handler(
        &mut self,
        swarm_dial_opts: DialOpts,
        handler: <TBehaviour as NetworkBehaviour>::ConnectionHandler,
    ) -> Result<(), DialError> {
        let (peer_id, addresses, dial_concurrency_factor_override, role_override) =
            match swarm_dial_opts.0 {
                // Dial a known peer.
                dial_opts::Opts::WithPeerId(dial_opts::WithPeerId {
                    peer_id,
                    condition,
                    role_override,
                    dial_concurrency_factor_override,
                })
                | dial_opts::Opts::WithPeerIdWithAddresses(dial_opts::WithPeerIdWithAddresses {
                    peer_id,
                    condition,
                    role_override,
                    dial_concurrency_factor_override,
                    ..
                }) => {
                    // Check [`PeerCondition`] if provided.
                    let condition_matched = match condition {
                        PeerCondition::Disconnected => !self.is_connected(&peer_id),
                        PeerCondition::NotDialing => {
                            !self
                                .pool
                                .iter_pending_info()
                                .any(move |(_, endpoint, peer)| {
                                    matches!(endpoint, PendingPoint::Dialer { .. })
                                        && peer.as_ref() == Some(&peer_id)
                                })
                        }
                        PeerCondition::Always => true,
                    };
                    if !condition_matched {
                        self.behaviour.inject_dial_failure(
                            Some(peer_id),
                            handler,
                            &DialError::DialPeerConditionFalse(condition),
                        );

                        return Err(DialError::DialPeerConditionFalse(condition));
                    }

                    // Check if peer is banned.
                    if self.banned_peers.contains(&peer_id) {
                        let error = DialError::Banned;
                        self.behaviour
                            .inject_dial_failure(Some(peer_id), handler, &error);
                        return Err(error);
                    }

                    // Retrieve the addresses to dial.
                    let addresses = {
                        let mut addresses = match swarm_dial_opts.0 {
                            dial_opts::Opts::WithPeerId(dial_opts::WithPeerId { .. }) => {
                                self.behaviour.addresses_of_peer(&peer_id)
                            }
                            dial_opts::Opts::WithPeerIdWithAddresses(
                                dial_opts::WithPeerIdWithAddresses {
                                    peer_id,
                                    mut addresses,
                                    extend_addresses_through_behaviour,
                                    ..
                                },
                            ) => {
                                if extend_addresses_through_behaviour {
                                    addresses.extend(self.behaviour.addresses_of_peer(&peer_id))
                                }
                                addresses
                            }
                            dial_opts::Opts::WithoutPeerIdWithAddress { .. } => {
                                unreachable!("Due to outer match.")
                            }
                        };

                        let mut unique_addresses = HashSet::new();
                        addresses.retain(|a| {
                            !self.listened_addrs.contains(a) && unique_addresses.insert(a.clone())
                        });

                        if addresses.is_empty() {
                            let error = DialError::NoAddresses;
                            self.behaviour
                                .inject_dial_failure(Some(peer_id), handler, &error);
                            return Err(error);
                        };

                        addresses
                    };

                    (
                        Some(peer_id),
                        Either::Left(addresses.into_iter()),
                        dial_concurrency_factor_override,
                        role_override,
                    )
                }
                // Dial an unknown peer.
                dial_opts::Opts::WithoutPeerIdWithAddress(
                    dial_opts::WithoutPeerIdWithAddress {
                        address,
                        role_override,
                    },
                ) => {
                    // If the address ultimately encapsulates an expected peer ID, dial that peer
                    // such that any mismatch is detected. We do not "pop off" the `P2p` protocol
                    // from the address, because it may be used by the `Transport`, i.e. `P2p`
                    // is a protocol component that can influence any transport, like `libp2p-dns`.
                    let peer_id = match address
                        .iter()
                        .last()
                        .and_then(|p| {
                            if let Protocol::P2p(ma) = p {
                                Some(PeerId::try_from(ma))
                            } else {
                                None
                            }
                        })
                        .transpose()
                    {
                        Ok(peer_id) => peer_id,
                        Err(multihash) => return Err(DialError::InvalidPeerId(multihash)),
                    };

                    (
                        peer_id,
                        Either::Right(iter::once(address)),
                        None,
                        role_override,
                    )
                }
            };

        let dials = addresses
            .map(|a| match p2p_addr(peer_id, a) {
                Ok(address) => {
                    let dial = match role_override {
                        Endpoint::Dialer => self.listeners.transport_mut().dial(address.clone()),
                        Endpoint::Listener => self
                            .listeners
                            .transport_mut()
                            .dial_as_listener(address.clone()),
                    };
                    match dial {
                        Ok(fut) => fut
                            .map(|r| (address, r.map_err(TransportError::Other)))
                            .boxed(),
                        Err(err) => futures::future::ready((address, Err(err))).boxed(),
                    }
                }
                Err(address) => futures::future::ready((
                    address.clone(),
                    Err(TransportError::MultiaddrNotSupported(address)),
                ))
                .boxed(),
            })
            .collect();

        match self.pool.add_outgoing(
            dials,
            peer_id,
            handler,
            role_override,
            dial_concurrency_factor_override,
        ) {
            Ok(_connection_id) => Ok(()),
            Err((connection_limit, handler)) => {
                let error = DialError::ConnectionLimit(connection_limit);
                self.behaviour.inject_dial_failure(None, handler, &error);
                Err(error)
            }
        }
    }

    /// Returns an iterator that produces the list of addresses we're listening on.
    pub fn listeners(&self) -> impl Iterator<Item = &Multiaddr> {
        self.listeners.listen_addrs()
    }

    /// Returns the peer ID of the swarm passed as parameter.
    pub fn local_peer_id(&self) -> &PeerId {
        &self.local_peer_id
    }

    /// Returns an iterator for [`AddressRecord`]s of external addresses
    /// of the local node, in decreasing order of their current
    /// [score](AddressScore).
    pub fn external_addresses(&self) -> impl Iterator<Item = &AddressRecord> {
        self.external_addrs.iter()
    }

    /// Adds an external address record for the local node.
    ///
    /// An external address is an address of the local node known to
    /// be (likely) reachable for other nodes, possibly taking into
    /// account NAT. The external addresses of the local node may be
    /// shared with other nodes by the `NetworkBehaviour`.
    ///
    /// The associated score determines both the position of the address
    /// in the list of external addresses (which can determine the
    /// order in which addresses are used to connect to) as well as
    /// how long the address is retained in the list, depending on
    /// how frequently it is reported by the `NetworkBehaviour` via
    /// [`NetworkBehaviourAction::ReportObservedAddr`] or explicitly
    /// through this method.
    pub fn add_external_address(&mut self, a: Multiaddr, s: AddressScore) -> AddAddressResult {
        let result = self.external_addrs.add(a.clone(), s);
        let expired = match &result {
            AddAddressResult::Inserted { expired } => {
                self.behaviour.inject_new_external_addr(&a);
                expired
            }
            AddAddressResult::Updated { expired } => expired,
        };
        for a in expired {
            self.behaviour.inject_expired_external_addr(&a.addr);
        }
        result
    }

    /// Removes an external address of the local node, regardless of
    /// its current score. See [`Swarm::add_external_address`]
    /// for details.
    ///
    /// Returns `true` if the address existed and was removed, `false`
    /// otherwise.
    pub fn remove_external_address(&mut self, addr: &Multiaddr) -> bool {
        if self.external_addrs.remove(addr) {
            self.behaviour.inject_expired_external_addr(addr);
            true
        } else {
            false
        }
    }

    /// Bans a peer by its peer ID.
    ///
    /// Any incoming connection and any dialing attempt will immediately be rejected.
    /// This function has no effect if the peer is already banned.
    pub fn ban_peer_id(&mut self, peer_id: PeerId) {
        if self.banned_peers.insert(peer_id) {
            // Note that established connections to the now banned peer are closed but not
            // added to [`Swarm::banned_peer_connections`]. They have been previously reported
            // as open to the behaviour and need be reported as closed once closing the
            // connection finishes.
            self.pool.disconnect(peer_id);
        }
    }

    /// Unbans a peer.
    pub fn unban_peer_id(&mut self, peer_id: PeerId) {
        self.banned_peers.remove(&peer_id);
    }

    /// Disconnects a peer by its peer ID, closing all connections to said peer.
    ///
    /// Returns `Ok(())` if there was one or more established connections to the peer.
    ///
    /// Note: Closing a connection via [`Swarm::disconnect_peer_id`] does
    /// not inform the corresponding [`ConnectionHandler`].
    /// Closing a connection via a [`ConnectionHandler`] can be done either in a
    /// collaborative manner across [`ConnectionHandler`]s
    /// with [`ConnectionHandler::connection_keep_alive`] or directly with
    /// [`ConnectionHandlerEvent::Close`].
    pub fn disconnect_peer_id(&mut self, peer_id: PeerId) -> Result<(), ()> {
        let was_connected = self.pool.is_connected(peer_id);
        self.pool.disconnect(peer_id);

        if was_connected {
            Ok(())
        } else {
            Err(())
        }
    }

    /// Checks whether there is an established connection to a peer.
    pub fn is_connected(&self, peer_id: &PeerId) -> bool {
        self.pool.is_connected(*peer_id)
    }

    /// Returns the currently connected peers.
    pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
        self.pool.iter_connected()
    }

    /// Returns a reference to the provided [`NetworkBehaviour`].
    pub fn behaviour(&self) -> &TBehaviour {
        &self.behaviour
    }

    /// Returns a mutable reference to the provided [`NetworkBehaviour`].
    pub fn behaviour_mut(&mut self) -> &mut TBehaviour {
        &mut self.behaviour
    }

    /// Internal function used by everything event-related.
    ///
    /// Polls the `Swarm` for the next event.
    fn poll_next_event(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
    ) -> Poll<SwarmEvent<TBehaviour::OutEvent, THandlerErr<TBehaviour>>> {
        // We use a `this` variable because the compiler can't mutably borrow multiple times
        // across a `Deref`.
        let this = &mut *self;

        loop {
            let mut listeners_not_ready = false;
            let mut connections_not_ready = false;

            // Poll the listener(s) for new connections.
            match ListenersStream::poll(Pin::new(&mut this.listeners), cx) {
                Poll::Pending => {
                    listeners_not_ready = true;
                }
                Poll::Ready(ListenersEvent::Incoming {
                    listener_id: _,
                    upgrade,
                    local_addr,
                    send_back_addr,
                }) => {
                    let handler = this.behaviour.new_handler();
                    match this.pool.add_incoming(
                        upgrade,
                        handler,
                        IncomingInfo {
                            local_addr: &local_addr,
                            send_back_addr: &send_back_addr,
                        },
                    ) {
                        Ok(_connection_id) => {
                            return Poll::Ready(SwarmEvent::IncomingConnection {
                                local_addr,
                                send_back_addr,
                            });
                        }
                        Err((connection_limit, handler)) => {
                            this.behaviour.inject_listen_failure(
                                &local_addr,
                                &send_back_addr,
                                handler,
                            );
                            log::warn!("Incoming connection rejected: {:?}", connection_limit);
                        }
                    };
                }
                Poll::Ready(ListenersEvent::NewAddress {
                    listener_id,
                    listen_addr,
                }) => {
                    log::debug!("Listener {:?}; New address: {:?}", listener_id, listen_addr);
                    if !this.listened_addrs.contains(&listen_addr) {
                        this.listened_addrs.push(listen_addr.clone())
                    }
                    this.behaviour
                        .inject_new_listen_addr(listener_id, &listen_addr);
                    return Poll::Ready(SwarmEvent::NewListenAddr {
                        listener_id,
                        address: listen_addr,
                    });
                }
                Poll::Ready(ListenersEvent::AddressExpired {
                    listener_id,
                    listen_addr,
                }) => {
                    log::debug!(
                        "Listener {:?}; Expired address {:?}.",
                        listener_id,
                        listen_addr
                    );
                    this.listened_addrs.retain(|a| a != &listen_addr);
                    this.behaviour
                        .inject_expired_listen_addr(listener_id, &listen_addr);
                    return Poll::Ready(SwarmEvent::ExpiredListenAddr {
                        listener_id,
                        address: listen_addr,
                    });
                }
                Poll::Ready(ListenersEvent::Closed {
                    listener_id,
                    addresses,
                    reason,
                }) => {
                    log::debug!("Listener {:?}; Closed by {:?}.", listener_id, reason);
                    for addr in addresses.iter() {
                        this.behaviour.inject_expired_listen_addr(listener_id, addr);
                    }
                    this.behaviour.inject_listener_closed(
                        listener_id,
                        match &reason {
                            Ok(()) => Ok(()),
                            Err(err) => Err(err),
                        },
                    );
                    return Poll::Ready(SwarmEvent::ListenerClosed {
                        listener_id,
                        addresses,
                        reason,
                    });
                }
                Poll::Ready(ListenersEvent::Error { listener_id, error }) => {
                    this.behaviour.inject_listener_error(listener_id, &error);
                    return Poll::Ready(SwarmEvent::ListenerError { listener_id, error });
                }
            }

            // Poll the known peers.
            match this.pool.poll(cx) {
                Poll::Pending => {
                    connections_not_ready = true;
                }
                Poll::Ready(PoolEvent::ConnectionEstablished {
                    connection,
                    other_established_connection_ids,
                    concurrent_dial_errors,
                }) => {
                    let peer_id = connection.peer_id();
                    let endpoint = connection.endpoint().clone();
                    if this.banned_peers.contains(&peer_id) {
                        // Mark the connection for the banned peer as banned, thus withholding any
                        // future events from the connection to the behaviour.
                        this.banned_peer_connections.insert(connection.id());
                        this.pool.disconnect(peer_id);
                        return Poll::Ready(SwarmEvent::BannedPeer { peer_id, endpoint });
                    } else {
                        let num_established = NonZeroU32::new(
                            u32::try_from(other_established_connection_ids.len() + 1).unwrap(),
                        )
                        .expect("n + 1 is always non-zero; qed");
                        let non_banned_established = other_established_connection_ids
                            .into_iter()
                            .filter(|conn_id| !this.banned_peer_connections.contains(conn_id))
                            .count();

                        log::debug!(
                            "Connection established: {:?} {:?}; Total (peer): {}. Total non-banned (peer): {}",
                            connection.peer_id(),
                            connection.endpoint(),
                            num_established,
                            non_banned_established + 1,
                        );
                        let endpoint = connection.endpoint().clone();
                        let failed_addresses = concurrent_dial_errors
                            .as_ref()
                            .map(|es| es.iter().map(|(a, _)| a).cloned().collect());
                        this.behaviour.inject_connection_established(
                            &peer_id,
                            &connection.id(),
                            &endpoint,
                            failed_addresses.as_ref(),
                            non_banned_established,
                        );
                        return Poll::Ready(SwarmEvent::ConnectionEstablished {
                            peer_id,
                            num_established,
                            endpoint,
                            concurrent_dial_errors,
                        });
                    }
                }
                Poll::Ready(PoolEvent::PendingOutboundConnectionError {
                    id: _,
                    error,
                    handler,
                    peer,
                }) => {
                    let error = error.into();

                    this.behaviour.inject_dial_failure(peer, handler, &error);

                    if let Some(peer) = peer {
                        log::debug!("Connection attempt to {:?} failed with {:?}.", peer, error,);
                    } else {
                        log::debug!("Connection attempt to unknown peer failed with {:?}", error);
                    }

                    return Poll::Ready(SwarmEvent::OutgoingConnectionError {
                        peer_id: peer,
                        error,
                    });
                }
                Poll::Ready(PoolEvent::PendingInboundConnectionError {
                    id: _,
                    send_back_addr,
                    local_addr,
                    error,
                    handler,
                }) => {
                    log::debug!("Incoming connection failed: {:?}", error);
                    this.behaviour
                        .inject_listen_failure(&local_addr, &send_back_addr, handler);
                    return Poll::Ready(SwarmEvent::IncomingConnectionError {
                        local_addr,
                        send_back_addr,
                        error,
                    });
                }
                Poll::Ready(PoolEvent::ConnectionClosed {
                    id,
                    connected,
                    error,
                    remaining_established_connection_ids,
                    handler,
                    ..
                }) => {
                    if let Some(error) = error.as_ref() {
                        log::debug!(
                            "Connection closed with error {:?}: {:?}; Total (peer): {}.",
                            error,
                            connected,
                            remaining_established_connection_ids.len()
                        );
                    } else {
                        log::debug!(
                            "Connection closed: {:?}; Total (peer): {}.",
                            connected,
                            remaining_established_connection_ids.len()
                        );
                    }
                    let peer_id = connected.peer_id;
                    let endpoint = connected.endpoint;
                    let num_established =
                        u32::try_from(remaining_established_connection_ids.len()).unwrap();
                    let conn_was_reported = !this.banned_peer_connections.remove(&id);
                    if conn_was_reported {
                        let remaining_non_banned = remaining_established_connection_ids
                            .into_iter()
                            .filter(|conn_id| !this.banned_peer_connections.contains(conn_id))
                            .count();
                        this.behaviour.inject_connection_closed(
                            &peer_id,
                            &id,
                            &endpoint,
                            handler,
                            remaining_non_banned,
                        );
                    }
                    return Poll::Ready(SwarmEvent::ConnectionClosed {
                        peer_id,
                        endpoint,
                        cause: error,
                        num_established,
                    });
                }
                Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => {
                    let peer = connection.peer_id();
                    let conn_id = connection.id();
                    if this.banned_peer_connections.contains(&conn_id) {
                        log::debug!("Ignoring event from banned peer: {} {:?}.", peer, conn_id);
                    } else {
                        this.behaviour.inject_event(peer, conn_id, event);
                    }
                }
                Poll::Ready(PoolEvent::AddressChange {
                    connection,
                    new_endpoint,
                    old_endpoint,
                }) => {
                    let peer = connection.peer_id();
                    let conn_id = connection.id();
                    if !this.banned_peer_connections.contains(&conn_id) {
                        this.behaviour.inject_address_change(
                            &peer,
                            &conn_id,
                            &old_endpoint,
                            &new_endpoint,
                        );
                    }
                }
            };

            // After the network had a chance to make progress, try to deliver
            // the pending event emitted by the behaviour in the previous iteration
            // to the connection handler(s). The pending event must be delivered
            // before polling the behaviour again. If the targeted peer
            // meanwhie disconnected, the event is discarded.
            if let Some((peer_id, handler, event)) = this.pending_event.take() {
                match handler {
                    PendingNotifyHandler::One(conn_id) => {
                        if let Some(mut conn) = this.pool.get_established(conn_id) {
                            if let Some(event) = notify_one(&mut conn, event, cx) {
                                this.pending_event = Some((peer_id, handler, event));
                                if listeners_not_ready && connections_not_ready {
                                    return Poll::Pending;
                                } else {
                                    continue;
                                }
                            }
                        }
                    }
                    PendingNotifyHandler::Any(ids) => {
                        if let Some((event, ids)) =
                            notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx)
                        {
                            let handler = PendingNotifyHandler::Any(ids);
                            this.pending_event = Some((peer_id, handler, event));
                            if listeners_not_ready && connections_not_ready {
                                return Poll::Pending;
                            } else {
                                continue;
                            }
                        }
                    }
                }
            }

            debug_assert!(this.pending_event.is_none());

            let behaviour_poll = {
                let mut parameters = SwarmPollParameters {
                    local_peer_id: &this.local_peer_id,
                    supported_protocols: &this.supported_protocols,
                    listened_addrs: &this.listened_addrs,
                    external_addrs: &this.external_addrs,
                };
                this.behaviour.poll(cx, &mut parameters)
            };

            match behaviour_poll {
                Poll::Pending if listeners_not_ready && connections_not_ready => {
                    return Poll::Pending
                }
                Poll::Pending => (),
                Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)) => {
                    return Poll::Ready(SwarmEvent::Behaviour(event))
                }
                Poll::Ready(NetworkBehaviourAction::Dial { opts, handler }) => {
                    let peer_id = opts.get_peer_id();
                    if let Ok(()) = this.dial_with_handler(opts, handler) {
                        if let Some(peer_id) = peer_id {
                            return Poll::Ready(SwarmEvent::Dialing(peer_id));
                        }
                    }
                }
                Poll::Ready(NetworkBehaviourAction::NotifyHandler {
                    peer_id,
                    handler,
                    event,
                }) => match handler {
                    NotifyHandler::One(connection) => {
                        if let Some(mut conn) = this.pool.get_established(connection) {
                            if let Some(event) = notify_one(&mut conn, event, cx) {
                                let handler = PendingNotifyHandler::One(connection);
                                this.pending_event = Some((peer_id, handler, event));
                                if listeners_not_ready && connections_not_ready {
                                    return Poll::Pending;
                                } else {
                                    continue;
                                }
                            }
                        }
                    }
                    NotifyHandler::Any => {
                        let ids = this
                            .pool
                            .iter_established_connections_of_peer(&peer_id)
                            .collect();
                        if let Some((event, ids)) =
                            notify_any::<_, _, TBehaviour>(ids, &mut this.pool, event, cx)
                        {
                            let handler = PendingNotifyHandler::Any(ids);
                            this.pending_event = Some((peer_id, handler, event));
                            if listeners_not_ready && connections_not_ready {
                                return Poll::Pending;
                            } else {
                                continue;
                            }
                        }
                    }
                },
                Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address, score }) => {
                    // Maps the given `observed_addr`, representing an address of the local
                    // node observed by a remote peer, onto the locally known listen addresses
                    // to yield one or more addresses of the local node that may be publicly
                    // reachable.
                    //
                    // I.e. this method incorporates the view of other peers into the listen
                    // addresses seen by the local node to account for possible IP and port
                    // mappings performed by intermediate network devices in an effort to
                    // obtain addresses for the local peer that are also reachable for peers
                    // other than the peer who reported the `observed_addr`.
                    //
                    // The translation is transport-specific. See [`Transport::address_translation`].
                    let translated_addresses = {
                        let transport = this.listeners.transport();
                        let mut addrs: Vec<_> = this
                            .listeners
                            .listen_addrs()
                            .filter_map(move |server| {
                                transport.address_translation(server, &address)
                            })
                            .collect();

                        // remove duplicates
                        addrs.sort_unstable();
                        addrs.dedup();
                        addrs
                    };
                    for addr in translated_addresses {
                        this.add_external_address(addr, score);
                    }
                }
                Poll::Ready(NetworkBehaviourAction::CloseConnection {
                    peer_id,
                    connection,
                }) => match connection {
                    CloseConnection::One(connection_id) => {
                        if let Some(conn) = this.pool.get_established(connection_id) {
                            conn.start_close();
                        }
                    }
                    CloseConnection::All => {
                        this.pool.disconnect(peer_id);
                    }
                },
            }
        }
    }
}

/// Connection to notify of a pending event.
///
/// The connection IDs out of which to notify one of an event are captured at
/// the time the behaviour emits the event, in order not to forward the event to
/// a new connection which the behaviour may not have been aware of at the time
/// it issued the request for sending it.
enum PendingNotifyHandler {
    One(ConnectionId),
    Any(SmallVec<[ConnectionId; 10]>),
}

/// Notify a single connection of an event.
///
/// Returns `Some` with the given event if the connection is not currently
/// ready to receive another event, in which case the current task is
/// scheduled to be woken up.
///
/// Returns `None` if the connection is closing or the event has been
/// successfully sent, in either case the event is consumed.
fn notify_one<'a, THandlerInEvent>(
    conn: &mut EstablishedConnection<'a, THandlerInEvent>,
    event: THandlerInEvent,
    cx: &mut Context<'_>,
) -> Option<THandlerInEvent> {
    match conn.poll_ready_notify_handler(cx) {
        Poll::Pending => Some(event),
        Poll::Ready(Err(())) => None, // connection is closing
        Poll::Ready(Ok(())) => {
            // Can now only fail if connection is closing.
            let _ = conn.notify_handler(event);
            None
        }
    }
}

/// Notify any one of a given list of connections of a peer of an event.
///
/// Returns `Some` with the given event and a new list of connections if
/// none of the given connections was able to receive the event but at
/// least one of them is not closing, in which case the current task
/// is scheduled to be woken up. The returned connections are those which
/// may still become ready to receive another event.
///
/// Returns `None` if either all connections are closing or the event
/// was successfully sent to a handler, in either case the event is consumed.
fn notify_any<TTrans, THandler, TBehaviour>(
    ids: SmallVec<[ConnectionId; 10]>,
    pool: &mut Pool<THandler, TTrans>,
    event: THandlerInEvent<TBehaviour>,
    cx: &mut Context<'_>,
) -> Option<(THandlerInEvent<TBehaviour>, SmallVec<[ConnectionId; 10]>)>
where
    TTrans: Transport,
    TTrans::Error: Send + 'static,
    TBehaviour: NetworkBehaviour,
    THandler: IntoConnectionHandler,
    THandler::Handler: ConnectionHandler<
        InEvent = THandlerInEvent<TBehaviour>,
        OutEvent = THandlerOutEvent<TBehaviour>,
    >,
{
    let mut pending = SmallVec::new();
    let mut event = Some(event); // (1)
    for id in ids.into_iter() {
        if let Some(mut conn) = pool.get_established(id) {
            match conn.poll_ready_notify_handler(cx) {
                Poll::Pending => pending.push(id),
                Poll::Ready(Err(())) => {} // connection is closing
                Poll::Ready(Ok(())) => {
                    let e = event.take().expect("by (1),(2)");
                    if let Err(e) = conn.notify_handler(e) {
                        event = Some(e) // (2)
                    } else {
                        break;
                    }
                }
            }
        }
    }

    event.and_then(|e| {
        if !pending.is_empty() {
            Some((e, pending))
        } else {
            None
        }
    })
}

/// Stream of events returned by [`Swarm`].
///
/// Includes events from the [`NetworkBehaviour`] as well as events about
/// connection and listener status. See [`SwarmEvent`] for details.
///
/// Note: This stream is infinite and it is guaranteed that
/// [`Stream::poll_next`] will never return `Poll::Ready(None)`.
impl<TBehaviour> Stream for Swarm<TBehaviour>
where
    TBehaviour: NetworkBehaviour,
{
    type Item = SwarmEvent<TBehaviourOutEvent<TBehaviour>, THandlerErr<TBehaviour>>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        self.as_mut().poll_next_event(cx).map(Some)
    }
}

/// The stream of swarm events never terminates, so we can implement fused for it.
impl<TBehaviour> FusedStream for Swarm<TBehaviour>
where
    TBehaviour: NetworkBehaviour,
{
    fn is_terminated(&self) -> bool {
        false
    }
}

/// Parameters passed to `poll()`, that the `NetworkBehaviour` has access to.
// TODO: #[derive(Debug)]
pub struct SwarmPollParameters<'a> {
    local_peer_id: &'a PeerId,
    supported_protocols: &'a [Vec<u8>],
    listened_addrs: &'a [Multiaddr],
    external_addrs: &'a Addresses,
}

impl<'a> PollParameters for SwarmPollParameters<'a> {
    type SupportedProtocolsIter = std::iter::Cloned<std::slice::Iter<'a, std::vec::Vec<u8>>>;
    type ListenedAddressesIter = std::iter::Cloned<std::slice::Iter<'a, Multiaddr>>;
    type ExternalAddressesIter = AddressIntoIter;

    fn supported_protocols(&self) -> Self::SupportedProtocolsIter {
        self.supported_protocols.iter().cloned()
    }

    fn listened_addresses(&self) -> Self::ListenedAddressesIter {
        self.listened_addrs.iter().cloned()
    }

    fn external_addresses(&self) -> Self::ExternalAddressesIter {
        self.external_addrs.clone().into_iter()
    }

    fn local_peer_id(&self) -> &PeerId {
        self.local_peer_id
    }
}

/// A [`SwarmBuilder`] provides an API for configuring and constructing a [`Swarm`].
pub struct SwarmBuilder<TBehaviour> {
    local_peer_id: PeerId,
    transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
    behaviour: TBehaviour,
    pool_config: PoolConfig,
    connection_limits: ConnectionLimits,
}

impl<TBehaviour> SwarmBuilder<TBehaviour>
where
    TBehaviour: NetworkBehaviour,
{
    /// Creates a new `SwarmBuilder` from the given transport, behaviour and
    /// local peer ID. The `Swarm` with its underlying `Network` is obtained
    /// via [`SwarmBuilder::build`].
    pub fn new(
        transport: transport::Boxed<(PeerId, StreamMuxerBox)>,
        behaviour: TBehaviour,
        local_peer_id: PeerId,
    ) -> Self {
        SwarmBuilder {
            local_peer_id,
            transport,
            behaviour,
            pool_config: Default::default(),
            connection_limits: Default::default(),
        }
    }

    /// Configures the `Executor` to use for spawning background tasks.
    ///
    /// By default, unless another executor has been configured,
    /// [`SwarmBuilder::build`] will try to set up a `ThreadPool`.
    pub fn executor(mut self, e: Box<dyn Executor + Send>) -> Self {
        self.pool_config = self.pool_config.with_executor(e);
        self
    }

    /// Configures the number of events from the [`NetworkBehaviour`] in
    /// destination to the [`ConnectionHandler`] that can be buffered before
    /// the [`Swarm`] has to wait. An individual buffer with this number of
    /// events exists for each individual connection.
    ///
    /// The ideal value depends on the executor used, the CPU speed, and the
    /// volume of events. If this value is too low, then the [`Swarm`] will
    /// be sleeping more often than necessary. Increasing this value increases
    /// the overall memory usage.
    pub fn notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
        self.pool_config = self.pool_config.with_notify_handler_buffer_size(n);
        self
    }

    /// Configures the number of extra events from the [`ConnectionHandler`] in
    /// destination to the [`NetworkBehaviour`] that can be buffered before
    /// the [`ConnectionHandler`] has to go to sleep.
    ///
    /// There exists a buffer of events received from [`ConnectionHandler`]s
    /// that the [`NetworkBehaviour`] has yet to process. This buffer is
    /// shared between all instances of [`ConnectionHandler`]. Each instance of
    /// [`ConnectionHandler`] is guaranteed one slot in this buffer, meaning
    /// that delivering an event for the first time is guaranteed to be
    /// instantaneous. Any extra event delivery, however, must wait for that
    /// first event to be delivered or for an "extra slot" to be available.
    ///
    /// This option configures the number of such "extra slots" in this
    /// shared buffer. These extra slots are assigned in a first-come,
    /// first-served basis.
    ///
    /// The ideal value depends on the executor used, the CPU speed, the
    /// average number of connections, and the volume of events. If this value
    /// is too low, then the [`ConnectionHandler`]s will be sleeping more often
    /// than necessary. Increasing this value increases the overall memory
    /// usage, and more importantly the latency between the moment when an
    /// event is emitted and the moment when it is received by the
    /// [`NetworkBehaviour`].
    pub fn connection_event_buffer_size(mut self, n: usize) -> Self {
        self.pool_config = self.pool_config.with_connection_event_buffer_size(n);
        self
    }

    /// Number of addresses concurrently dialed for a single outbound connection attempt.
    pub fn dial_concurrency_factor(mut self, factor: NonZeroU8) -> Self {
        self.pool_config = self.pool_config.with_dial_concurrency_factor(factor);
        self
    }

    /// Configures the connection limits.
    pub fn connection_limits(mut self, limits: ConnectionLimits) -> Self {
        self.connection_limits = limits;
        self
    }

    /// Configures an override for the substream upgrade protocol to use.
    ///
    /// The subtream upgrade protocol is the multistream-select protocol
    /// used for protocol negotiation on substreams. Since a listener
    /// supports all existing versions, the choice of upgrade protocol
    /// only effects the "dialer", i.e. the peer opening a substream.
    ///
    /// > **Note**: If configured, specific upgrade protocols for
    /// > individual [`SubstreamProtocol`]s emitted by the `NetworkBehaviour`
    /// > are ignored.
    pub fn substream_upgrade_protocol_override(mut self, v: libp2p_core::upgrade::Version) -> Self {
        self.pool_config = self.pool_config.with_substream_upgrade_protocol_override(v);
        self
    }

    /// Builds a `Swarm` with the current configuration.
    pub fn build(mut self) -> Swarm<TBehaviour> {
        let supported_protocols = self
            .behaviour
            .new_handler()
            .inbound_protocol()
            .protocol_info()
            .into_iter()
            .map(|info| info.protocol_name().to_vec())
            .collect();

        // If no executor has been explicitly configured, try to set up a thread pool.
        let pool_config =
            self.pool_config.or_else_with_executor(|| {
                match ThreadPoolBuilder::new()
                    .name_prefix("libp2p-swarm-task-")
                    .create()
                {
                    Ok(tp) => Some(Box::new(move |f| tp.spawn_ok(f))),
                    Err(err) => {
                        log::warn!("Failed to create executor thread pool: {:?}", err);
                        None
                    }
                }
            });

        Swarm {
            local_peer_id: self.local_peer_id,
            listeners: ListenersStream::new(self.transport),
            pool: Pool::new(self.local_peer_id, pool_config, self.connection_limits),
            behaviour: self.behaviour,
            supported_protocols,
            listened_addrs: SmallVec::new(),
            external_addrs: Addresses::default(),
            banned_peers: HashSet::new(),
            banned_peer_connections: HashSet::new(),
            pending_event: None,
        }
    }
}

/// The possible failures of dialing.
#[derive(Debug)]
pub enum DialError {
    /// The peer is currently banned.
    Banned,
    /// The configured limit for simultaneous outgoing connections
    /// has been reached.
    ConnectionLimit(ConnectionLimit),
    /// The peer being dialed is the local peer and thus the dial was aborted.
    LocalPeerId,
    /// [`NetworkBehaviour::addresses_of_peer`] returned no addresses
    /// for the peer to dial.
    NoAddresses,
    /// The provided [`dial_opts::PeerCondition`] evaluated to false and thus
    /// the dial was aborted.
    DialPeerConditionFalse(dial_opts::PeerCondition),
    /// Pending connection attempt has been aborted.
    Aborted,
    /// The provided peer identity is invalid.
    InvalidPeerId(Multihash),
    /// The peer identity obtained on the connection did not match the one that was expected.
    WrongPeerId {
        obtained: PeerId,
        endpoint: ConnectedPoint,
    },
    /// An I/O error occurred on the connection.
    ConnectionIo(io::Error),
    /// An error occurred while negotiating the transport protocol(s) on a connection.
    Transport(Vec<(Multiaddr, TransportError<io::Error>)>),
}

impl From<PendingOutboundConnectionError<io::Error>> for DialError {
    fn from(error: PendingOutboundConnectionError<io::Error>) -> Self {
        match error {
            PendingConnectionError::ConnectionLimit(limit) => DialError::ConnectionLimit(limit),
            PendingConnectionError::Aborted => DialError::Aborted,
            PendingConnectionError::WrongPeerId { obtained, endpoint } => {
                DialError::WrongPeerId { obtained, endpoint }
            }
            PendingConnectionError::IO(e) => DialError::ConnectionIo(e),
            PendingConnectionError::Transport(e) => DialError::Transport(e),
        }
    }
}

impl fmt::Display for DialError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            DialError::ConnectionLimit(err) => write!(f, "Dial error: {}", err),
            DialError::NoAddresses => write!(f, "Dial error: no addresses for peer."),
            DialError::LocalPeerId => write!(f, "Dial error: tried to dial local peer id."),
            DialError::Banned => write!(f, "Dial error: peer is banned."),
            DialError::DialPeerConditionFalse(c) => {
                write!(
                    f,
                    "Dial error: condition {:?} for dialing peer was false.",
                    c
                )
            }
            DialError::Aborted => write!(
                f,
                "Dial error: Pending connection attempt has been aborted."
            ),
            DialError::InvalidPeerId(multihash) => write!(f, "Dial error: multihash {:?} is not a PeerId", multihash),
            DialError::WrongPeerId { obtained, endpoint} => write!(f, "Dial error: Unexpected peer ID {} at {:?}.", obtained, endpoint),
            DialError::ConnectionIo(e) => write!(
                f,
                "Dial error: An I/O error occurred on the connection: {:?}.", e
            ),
            DialError::Transport(e) => write!(f, "An error occurred while negotiating the transport protocol(s) on a connection: {:?}.", e),
        }
    }
}

impl error::Error for DialError {
    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
        match self {
            DialError::ConnectionLimit(err) => Some(err),
            DialError::LocalPeerId => None,
            DialError::NoAddresses => None,
            DialError::Banned => None,
            DialError::DialPeerConditionFalse(_) => None,
            DialError::Aborted => None,
            DialError::InvalidPeerId { .. } => None,
            DialError::WrongPeerId { .. } => None,
            DialError::ConnectionIo(_) => None,
            DialError::Transport(_) => None,
        }
    }
}

/// Dummy implementation of [`NetworkBehaviour`] that doesn't do anything.
#[derive(Clone)]
pub struct DummyBehaviour {
    keep_alive: KeepAlive,
}

impl DummyBehaviour {
    pub fn with_keep_alive(keep_alive: KeepAlive) -> Self {
        Self { keep_alive }
    }

    pub fn keep_alive_mut(&mut self) -> &mut KeepAlive {
        &mut self.keep_alive
    }
}

impl Default for DummyBehaviour {
    fn default() -> Self {
        Self {
            keep_alive: KeepAlive::No,
        }
    }
}

impl NetworkBehaviour for DummyBehaviour {
    type ConnectionHandler = handler::DummyConnectionHandler;
    type OutEvent = void::Void;

    fn new_handler(&mut self) -> Self::ConnectionHandler {
        handler::DummyConnectionHandler {
            keep_alive: self.keep_alive,
        }
    }

    fn inject_event(
        &mut self,
        _: PeerId,
        _: ConnectionId,
        event: <Self::ConnectionHandler as ConnectionHandler>::OutEvent,
    ) {
        void::unreachable(event)
    }

    fn poll(
        &mut self,
        _: &mut Context<'_>,
        _: &mut impl PollParameters,
    ) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
        Poll::Pending
    }
}

/// Information about the connections obtained by [`Swarm::network_info()`].
#[derive(Clone, Debug)]
pub struct NetworkInfo {
    /// The total number of connected peers.
    num_peers: usize,
    /// Counters of ongoing network connections.
    connection_counters: ConnectionCounters,
}

impl NetworkInfo {
    /// The number of connected peers, i.e. peers with whom at least
    /// one established connection exists.
    pub fn num_peers(&self) -> usize {
        self.num_peers
    }

    /// Gets counters for ongoing network connections.
    pub fn connection_counters(&self) -> &ConnectionCounters {
        &self.connection_counters
    }
}

/// Ensures a given `Multiaddr` is a `/p2p/...` address for the given peer.
///
/// If the given address is already a `p2p` address for the given peer,
/// i.e. the last encapsulated protocol is `/p2p/<peer-id>`, this is a no-op.
///
/// If the given address is already a `p2p` address for a different peer
/// than the one given, the given `Multiaddr` is returned as an `Err`.
///
/// If the given address is not yet a `p2p` address for the given peer,
/// the `/p2p/<peer-id>` protocol is appended to the returned address.
fn p2p_addr(peer: Option<PeerId>, addr: Multiaddr) -> Result<Multiaddr, Multiaddr> {
    let peer = match peer {
        Some(p) => p,
        None => return Ok(addr),
    };

    if let Some(Protocol::P2p(hash)) = addr.iter().last() {
        if &hash != peer.as_ref() {
            return Err(addr);
        }
        Ok(addr)
    } else {
        Ok(addr.with(Protocol::P2p(peer.into())))
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::handler::DummyConnectionHandler;
    use crate::test::{CallTraceBehaviour, MockBehaviour};
    use futures::executor::block_on;
    use futures::future::poll_fn;
    use futures::future::Either;
    use futures::{executor, future, ready};
    use libp2p::core::{identity, multiaddr, transport, upgrade};
    use libp2p::plaintext;
    use libp2p::yamux;
    use libp2p_core::multiaddr::multiaddr;
    use libp2p_core::transport::ListenerEvent;
    use libp2p_core::Endpoint;
    use quickcheck::{quickcheck, Arbitrary, Gen, QuickCheck};
    use rand::prelude::SliceRandom;
    use rand::Rng;

    // Test execution state.
    // Connection => Disconnecting => Connecting.
    enum State {
        Connecting,
        Disconnecting,
    }

    fn new_test_swarm<T, O>(
        handler_proto: T,
    ) -> SwarmBuilder<CallTraceBehaviour<MockBehaviour<T, O>>>
    where
        T: ConnectionHandler + Clone,
        T::OutEvent: Clone,
        O: Send + 'static,
    {
        let id_keys = identity::Keypair::generate_ed25519();
        let local_public_key = id_keys.public();
        let transport = transport::MemoryTransport::default()
            .upgrade(upgrade::Version::V1)
            .authenticate(plaintext::PlainText2Config {
                local_public_key: local_public_key.clone(),
            })
            .multiplex(yamux::YamuxConfig::default())
            .boxed();
        let behaviour = CallTraceBehaviour::new(MockBehaviour::new(handler_proto));
        SwarmBuilder::new(transport, behaviour, local_public_key.into())
    }

    fn swarms_connected<TBehaviour>(
        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
        num_connections: usize,
    ) -> bool
    where
        TBehaviour: NetworkBehaviour,
        <<TBehaviour::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent: Clone,
    {
        swarm1
            .behaviour()
            .num_connections_to_peer(*swarm2.local_peer_id())
            == num_connections
            && swarm2
                .behaviour()
                .num_connections_to_peer(*swarm1.local_peer_id())
                == num_connections
            && swarm1.is_connected(swarm2.local_peer_id())
            && swarm2.is_connected(swarm1.local_peer_id())
    }

    fn swarms_disconnected<TBehaviour: NetworkBehaviour>(
        swarm1: &Swarm<CallTraceBehaviour<TBehaviour>>,
        swarm2: &Swarm<CallTraceBehaviour<TBehaviour>>,
    ) -> bool
    where
        TBehaviour: NetworkBehaviour,
        <<TBehaviour::ConnectionHandler as IntoConnectionHandler>::Handler as ConnectionHandler>::OutEvent: Clone
    {
        swarm1
            .behaviour()
            .num_connections_to_peer(*swarm2.local_peer_id())
            == 0
            && swarm2
                .behaviour()
                .num_connections_to_peer(*swarm1.local_peer_id())
                == 0
            && !swarm1.is_connected(swarm2.local_peer_id())
            && !swarm2.is_connected(swarm1.local_peer_id())
    }

    /// Establishes multiple connections between two peers,
    /// after which one peer bans the other.
    ///
    /// The test expects both behaviours to be notified via pairs of
    /// inject_connected / inject_disconnected as well as
    /// inject_connection_established / inject_connection_closed calls
    /// while unbanned.
    ///
    /// While the ban is in effect, further dials occur. For these connections no
    /// `inject_connected`, `inject_connection_established`, `inject_disconnected`,
    /// `inject_connection_closed` calls should be registered.
    #[test]
    fn test_connect_disconnect_ban() {
        // Since the test does not try to open any substreams, we can
        // use the dummy protocols handler.
        let handler_proto = DummyConnectionHandler {
            keep_alive: KeepAlive::Yes,
        };

        let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build();
        let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build();

        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();

        swarm1.listen_on(addr1.clone()).unwrap();
        swarm2.listen_on(addr2.clone()).unwrap();

        let swarm1_id = *swarm1.local_peer_id();

        enum Stage {
            /// Waiting for the peers to connect. Banning has not occurred.
            Connecting,
            /// Ban occurred.
            Banned,
            // Ban is in place and a dial is ongoing.
            BannedDial,
            // Mid-ban dial was registered and the peer was unbanned.
            Unbanned,
            // There are dial attempts ongoing for the no longer banned peers.
            Reconnecting,
        }

        let num_connections = 10;

        for _ in 0..num_connections {
            swarm1.dial(addr2.clone()).unwrap();
        }

        let mut s1_expected_conns = num_connections;
        let mut s2_expected_conns = num_connections;

        let mut stage = Stage::Connecting;

        executor::block_on(future::poll_fn(move |cx| loop {
            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
            match stage {
                Stage::Connecting => {
                    if swarm1.behaviour.assert_connected(s1_expected_conns, 1)
                        && swarm2.behaviour.assert_connected(s2_expected_conns, 1)
                    {
                        // Setup to test that already established connections are correctly closed
                        // and reported as such after the peer is banned.
                        swarm2.ban_peer_id(swarm1_id);
                        stage = Stage::Banned;
                    }
                }
                Stage::Banned => {
                    if swarm1.behaviour.assert_disconnected(s1_expected_conns, 1)
                        && swarm2.behaviour.assert_disconnected(s2_expected_conns, 1)
                    {
                        // Setup to test that new connections of banned peers are not reported.
                        swarm1.dial(addr2.clone()).unwrap();
                        s1_expected_conns += 1;
                        stage = Stage::BannedDial;
                    }
                }
                Stage::BannedDial => {
                    if swarm2.network_info().num_peers() == 1 {
                        // The banned connection was established. Check that it was not reported to
                        // the behaviour of the banning swarm.
                        assert_eq!(
                            swarm2.behaviour.inject_connection_established.len(), s2_expected_conns,
                            "No additional closed connections should be reported for the banned peer"
                        );

                        // Setup to test that the banned connection is not reported upon closing
                        // even if the peer is unbanned.
                        swarm2.unban_peer_id(swarm1_id);
                        stage = Stage::Unbanned;
                    }
                }
                Stage::Unbanned => {
                    if swarm2.network_info().num_peers() == 0 {
                        // The banned connection has closed. Check that it was not reported.
                        assert_eq!(
                            swarm2.behaviour.inject_connection_closed.len(), s2_expected_conns,
                            "No additional closed connections should be reported for the banned peer"
                        );
                        assert!(swarm2.banned_peer_connections.is_empty());

                        // Setup to test that a ban lifted does not affect future connections.
                        for _ in 0..num_connections {
                            swarm1.dial(addr2.clone()).unwrap();
                        }
                        s1_expected_conns += num_connections;
                        s2_expected_conns += num_connections;
                        stage = Stage::Reconnecting;
                    }
                }
                Stage::Reconnecting => {
                    if swarm1.behaviour.inject_connection_established.len() == s1_expected_conns
                        && swarm2.behaviour.assert_connected(s2_expected_conns, 2)
                    {
                        return Poll::Ready(());
                    }
                }
            }

            if poll1.is_pending() && poll2.is_pending() {
                return Poll::Pending;
            }
        }))
    }

    /// Establishes multiple connections between two peers,
    /// after which one peer disconnects the other using [`Swarm::disconnect_peer_id`].
    ///
    /// The test expects both behaviours to be notified via pairs of
    /// inject_connected / inject_disconnected as well as
    /// inject_connection_established / inject_connection_closed calls.
    #[test]
    fn test_swarm_disconnect() {
        // Since the test does not try to open any substreams, we can
        // use the dummy protocols handler.
        let handler_proto = DummyConnectionHandler {
            keep_alive: KeepAlive::Yes,
        };

        let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build();
        let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build();

        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();

        swarm1.listen_on(addr1.clone()).unwrap();
        swarm2.listen_on(addr2.clone()).unwrap();

        let swarm1_id = *swarm1.local_peer_id();

        let mut reconnected = false;
        let num_connections = 10;

        for _ in 0..num_connections {
            swarm1.dial(addr2.clone()).unwrap();
        }
        let mut state = State::Connecting;

        executor::block_on(future::poll_fn(move |cx| loop {
            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
            match state {
                State::Connecting => {
                    if swarms_connected(&swarm1, &swarm2, num_connections) {
                        if reconnected {
                            return Poll::Ready(());
                        }
                        swarm2
                            .disconnect_peer_id(swarm1_id)
                            .expect("Error disconnecting");
                        state = State::Disconnecting;
                    }
                }
                State::Disconnecting => {
                    if swarms_disconnected(&swarm1, &swarm2) {
                        if reconnected {
                            return Poll::Ready(());
                        }
                        reconnected = true;
                        for _ in 0..num_connections {
                            swarm2.dial(addr1.clone()).unwrap();
                        }
                        state = State::Connecting;
                    }
                }
            }

            if poll1.is_pending() && poll2.is_pending() {
                return Poll::Pending;
            }
        }))
    }

    /// Establishes multiple connections between two peers,
    /// after which one peer disconnects the other
    /// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`].
    ///
    /// The test expects both behaviours to be notified via pairs of
    /// inject_connected / inject_disconnected as well as
    /// inject_connection_established / inject_connection_closed calls.
    #[test]
    fn test_behaviour_disconnect_all() {
        // Since the test does not try to open any substreams, we can
        // use the dummy protocols handler.
        let handler_proto = DummyConnectionHandler {
            keep_alive: KeepAlive::Yes,
        };

        let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build();
        let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build();

        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();

        swarm1.listen_on(addr1.clone()).unwrap();
        swarm2.listen_on(addr2.clone()).unwrap();

        let swarm1_id = *swarm1.local_peer_id();

        let mut reconnected = false;
        let num_connections = 10;

        for _ in 0..num_connections {
            swarm1.dial(addr2.clone()).unwrap();
        }
        let mut state = State::Connecting;

        executor::block_on(future::poll_fn(move |cx| loop {
            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
            match state {
                State::Connecting => {
                    if swarms_connected(&swarm1, &swarm2, num_connections) {
                        if reconnected {
                            return Poll::Ready(());
                        }
                        swarm2.behaviour.inner().next_action.replace(
                            NetworkBehaviourAction::CloseConnection {
                                peer_id: swarm1_id,
                                connection: CloseConnection::All,
                            },
                        );
                        state = State::Disconnecting;
                        continue;
                    }
                }
                State::Disconnecting => {
                    if swarms_disconnected(&swarm1, &swarm2) {
                        reconnected = true;
                        for _ in 0..num_connections {
                            swarm2.dial(addr1.clone()).unwrap();
                        }
                        state = State::Connecting;
                        continue;
                    }
                }
            }

            if poll1.is_pending() && poll2.is_pending() {
                return Poll::Pending;
            }
        }))
    }

    /// Establishes multiple connections between two peers,
    /// after which one peer closes a single connection
    /// using [`NetworkBehaviourAction::CloseConnection`] returned by a [`NetworkBehaviour`].
    ///
    /// The test expects both behaviours to be notified via pairs of
    /// inject_connected / inject_disconnected as well as
    /// inject_connection_established / inject_connection_closed calls.
    #[test]
    fn test_behaviour_disconnect_one() {
        // Since the test does not try to open any substreams, we can
        // use the dummy protocols handler.
        let handler_proto = DummyConnectionHandler {
            keep_alive: KeepAlive::Yes,
        };

        let mut swarm1 = new_test_swarm::<_, ()>(handler_proto.clone()).build();
        let mut swarm2 = new_test_swarm::<_, ()>(handler_proto).build();

        let addr1: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();
        let addr2: Multiaddr = multiaddr::Protocol::Memory(rand::random::<u64>()).into();

        swarm1.listen_on(addr1.clone()).unwrap();
        swarm2.listen_on(addr2.clone()).unwrap();

        let swarm1_id = *swarm1.local_peer_id();

        let num_connections = 10;

        for _ in 0..num_connections {
            swarm1.dial(addr2.clone()).unwrap();
        }
        let mut state = State::Connecting;
        let mut disconnected_conn_id = None;

        executor::block_on(future::poll_fn(move |cx| loop {
            let poll1 = Swarm::poll_next_event(Pin::new(&mut swarm1), cx);
            let poll2 = Swarm::poll_next_event(Pin::new(&mut swarm2), cx);
            match state {
                State::Connecting => {
                    if swarms_connected(&swarm1, &swarm2, num_connections) {
                        disconnected_conn_id = {
                            let conn_id = swarm2.behaviour.inject_connection_established
                                [num_connections / 2]
                                .1;
                            swarm2.behaviour.inner().next_action.replace(
                                NetworkBehaviourAction::CloseConnection {
                                    peer_id: swarm1_id,
                                    connection: CloseConnection::One(conn_id),
                                },
                            );
                            Some(conn_id)
                        };
                        state = State::Disconnecting;
                    }
                }
                State::Disconnecting => {
                    for s in &[&swarm1, &swarm2] {
                        assert!(s
                            .behaviour
                            .inject_connection_closed
                            .iter()
                            .all(|(.., remaining_conns)| *remaining_conns > 0));
                        assert_eq!(
                            s.behaviour.inject_connection_established.len(),
                            num_connections
                        );
                        s.behaviour.assert_connected(num_connections, 1);
                    }
                    if [&swarm1, &swarm2]
                        .iter()
                        .all(|s| s.behaviour.inject_connection_closed.len() == 1)
                    {
                        let conn_id = swarm2.behaviour.inject_connection_closed[0].1;
                        assert_eq!(Some(conn_id), disconnected_conn_id);
                        return Poll::Ready(());
                    }
                }
            }

            if poll1.is_pending() && poll2.is_pending() {
                return Poll::Pending;
            }
        }))
    }

    #[test]
    fn concurrent_dialing() {
        #[derive(Clone, Debug)]
        struct DialConcurrencyFactor(NonZeroU8);

        impl Arbitrary for DialConcurrencyFactor {
            fn arbitrary<G: Gen>(g: &mut G) -> Self {
                Self(NonZeroU8::new(g.gen_range(1, 11)).unwrap())
            }
        }

        fn prop(concurrency_factor: DialConcurrencyFactor) {
            block_on(async {
                let mut swarm = new_test_swarm::<_, ()>(DummyConnectionHandler {
                    keep_alive: KeepAlive::Yes,
                })
                .dial_concurrency_factor(concurrency_factor.0)
                .build();

                // Listen on `concurrency_factor + 1` addresses.
                //
                // `+ 2` to ensure a subset of addresses is dialed by network_2.
                let num_listen_addrs = concurrency_factor.0.get() + 2;
                let mut listen_addresses = Vec::new();
                let mut listeners = Vec::new();
                for _ in 0..num_listen_addrs {
                    let mut listener = transport::MemoryTransport {}
                        .listen_on("/memory/0".parse().unwrap())
                        .unwrap();

                    match listener.next().await.unwrap().unwrap() {
                        ListenerEvent::NewAddress(address) => {
                            listen_addresses.push(address);
                        }
                        _ => panic!("Expected `NewListenAddr` event."),
                    }

                    listeners.push(listener);
                }

                // Have swarm dial each listener and wait for each listener to receive the incoming
                // connections.
                swarm
                    .dial(
                        DialOpts::peer_id(PeerId::random())
                            .addresses(listen_addresses.into())
                            .build(),
                    )
                    .unwrap();
                for mut listener in listeners.into_iter() {
                    loop {
                        match futures::future::select(listener.next(), swarm.next()).await {
                            Either::Left((Some(Ok(ListenerEvent::Upgrade { .. })), _)) => {
                                break;
                            }
                            Either::Left(_) => {
                                panic!("Unexpected listener event.")
                            }
                            Either::Right((e, _)) => {
                                panic!("Expect swarm to not emit any event {:?}", e)
                            }
                        }
                    }
                }

                match swarm.next().await.unwrap() {
                    SwarmEvent::OutgoingConnectionError { .. } => {}
                    e => panic!("Unexpected swarm event {:?}", e),
                }
            })
        }

        QuickCheck::new().tests(10).quickcheck(prop as fn(_) -> _);
    }

    #[test]
    fn max_outgoing() {
        use rand::Rng;

        let outgoing_limit = rand::thread_rng().gen_range(1, 10);

        let limits = ConnectionLimits::default().with_max_pending_outgoing(Some(outgoing_limit));
        let mut network = new_test_swarm::<_, ()>(DummyConnectionHandler {
            keep_alive: KeepAlive::Yes,
        })
        .connection_limits(limits)
        .build();

        let addr: Multiaddr = "/memory/1234".parse().unwrap();

        let target = PeerId::random();
        for _ in 0..outgoing_limit {
            network
                .dial(
                    DialOpts::peer_id(target)
                        .addresses(vec![addr.clone()])
                        .build(),
                )
                .ok()
                .expect("Unexpected connection limit.");
        }

        match network
            .dial(
                DialOpts::peer_id(target)
                    .addresses(vec![addr.clone()])
                    .build(),
            )
            .expect_err("Unexpected dialing success.")
        {
            DialError::ConnectionLimit(limit) => {
                assert_eq!(limit.current, outgoing_limit);
                assert_eq!(limit.limit, outgoing_limit);
            }
            e => panic!("Unexpected error: {:?}", e),
        }

        let info = network.network_info();
        assert_eq!(info.num_peers(), 0);
        assert_eq!(
            info.connection_counters().num_pending_outgoing(),
            outgoing_limit
        );
    }

    #[test]
    fn max_established_incoming() {
        use rand::Rng;

        #[derive(Debug, Clone)]
        struct Limit(u32);

        impl Arbitrary for Limit {
            fn arbitrary<G: Gen>(g: &mut G) -> Self {
                Self(g.gen_range(1, 10))
            }
        }

        fn limits(limit: u32) -> ConnectionLimits {
            ConnectionLimits::default().with_max_established_incoming(Some(limit))
        }

        fn prop(limit: Limit) {
            let limit = limit.0;

            let mut network1 = new_test_swarm::<_, ()>(DummyConnectionHandler {
                keep_alive: KeepAlive::Yes,
            })
            .connection_limits(limits(limit))
            .build();
            let mut network2 = new_test_swarm::<_, ()>(DummyConnectionHandler {
                keep_alive: KeepAlive::Yes,
            })
            .connection_limits(limits(limit))
            .build();

            let _ = network1.listen_on(multiaddr![Memory(0u64)]).unwrap();
            let listen_addr = async_std::task::block_on(poll_fn(|cx| {
                match ready!(network1.poll_next_unpin(cx)).unwrap() {
                    SwarmEvent::NewListenAddr { address, .. } => Poll::Ready(address),
                    e => panic!("Unexpected network event: {:?}", e),
                }
            }));

            // Spawn and block on the dialer.
            async_std::task::block_on({
                let mut n = 0;
                let _ = network2.dial(listen_addr.clone()).unwrap();

                let mut expected_closed = false;
                let mut network_1_established = false;
                let mut network_2_established = false;
                let mut network_1_limit_reached = false;
                let mut network_2_limit_reached = false;
                poll_fn(move |cx| {
                    loop {
                        let mut network_1_pending = false;
                        let mut network_2_pending = false;

                        match network1.poll_next_unpin(cx) {
                            Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) => {}
                            Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => {
                                network_1_established = true;
                            }
                            Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
                                error: PendingConnectionError::ConnectionLimit(err),
                                ..
                            })) => {
                                assert_eq!(err.limit, limit);
                                assert_eq!(err.limit, err.current);
                                let info = network1.network_info();
                                let counters = info.connection_counters();
                                assert_eq!(counters.num_established_incoming(), limit);
                                assert_eq!(counters.num_established(), limit);
                                network_1_limit_reached = true;
                            }
                            Poll::Pending => {
                                network_1_pending = true;
                            }
                            e => panic!("Unexpected network event: {:?}", e),
                        }

                        match network2.poll_next_unpin(cx) {
                            Poll::Ready(Some(SwarmEvent::ConnectionEstablished { .. })) => {
                                network_2_established = true;
                            }
                            Poll::Ready(Some(SwarmEvent::ConnectionClosed { .. })) => {
                                assert!(expected_closed);
                                let info = network2.network_info();
                                let counters = info.connection_counters();
                                assert_eq!(counters.num_established_outgoing(), limit);
                                assert_eq!(counters.num_established(), limit);
                                network_2_limit_reached = true;
                            }
                            Poll::Pending => {
                                network_2_pending = true;
                            }
                            e => panic!("Unexpected network event: {:?}", e),
                        }

                        if network_1_pending && network_2_pending {
                            return Poll::Pending;
                        }

                        if network_1_established && network_2_established {
                            network_1_established = false;
                            network_2_established = false;

                            if n <= limit {
                                // Dial again until the limit is exceeded.
                                n += 1;
                                network2.dial(listen_addr.clone()).unwrap();

                                if n == limit {
                                    // The the next dialing attempt exceeds the limit, this
                                    // is the connection we expected to get closed.
                                    expected_closed = true;
                                }
                            } else {
                                panic!("Expect networks not to establish connections beyond the limit.")
                            }
                        }

                        if network_1_limit_reached && network_2_limit_reached {
                            return Poll::Ready(());
                        }
                    }
                })
            });
        }

        quickcheck(prop as fn(_));
    }

    #[test]
    fn invalid_peer_id() {
        // Checks whether dialing an address containing the wrong peer id raises an error
        // for the expected peer id instead of the obtained peer id.

        let mut swarm1 = new_test_swarm::<_, ()>(DummyConnectionHandler::default()).build();
        let mut swarm2 = new_test_swarm::<_, ()>(DummyConnectionHandler::default()).build();

        swarm1.listen_on("/memory/0".parse().unwrap()).unwrap();

        let address =
            futures::executor::block_on(future::poll_fn(|cx| match swarm1.poll_next_unpin(cx) {
                Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => {
                    Poll::Ready(address)
                }
                Poll::Pending => Poll::Pending,
                _ => panic!("Was expecting the listen address to be reported"),
            }));

        let other_id = PeerId::random();
        let other_addr = address.with(Protocol::P2p(other_id.into()));

        swarm2.dial(other_addr.clone()).unwrap();

        let (peer_id, error) = futures::executor::block_on(future::poll_fn(|cx| {
            if let Poll::Ready(Some(SwarmEvent::IncomingConnection { .. })) =
                swarm1.poll_next_unpin(cx)
            {}

            match swarm2.poll_next_unpin(cx) {
                Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
                    peer_id, error, ..
                })) => Poll::Ready((peer_id, error)),
                Poll::Ready(x) => panic!("unexpected {:?}", x),
                Poll::Pending => Poll::Pending,
            }
        }));
        assert_eq!(peer_id.unwrap(), other_id);
        match error {
            DialError::WrongPeerId { obtained, endpoint } => {
                assert_eq!(obtained, *swarm1.local_peer_id());
                assert_eq!(
                    endpoint,
                    ConnectedPoint::Dialer {
                        address: other_addr,
                        role_override: Endpoint::Dialer,
                    }
                );
            }
            x => panic!("wrong error {:?}", x),
        }
    }

    #[test]
    fn dial_self() {
        // Check whether dialing ourselves correctly fails.
        //
        // Dialing the same address we're listening should result in three events:
        //
        // - The incoming connection notification (before we know the incoming peer ID).
        // - The connection error for the dialing endpoint (once we've determined that it's our own ID).
        // - The connection error for the listening endpoint (once we've determined that it's our own ID).
        //
        // The last two can happen in any order.

        let mut swarm = new_test_swarm::<_, ()>(DummyConnectionHandler::default()).build();
        swarm.listen_on("/memory/0".parse().unwrap()).unwrap();

        let local_address =
            futures::executor::block_on(future::poll_fn(|cx| match swarm.poll_next_unpin(cx) {
                Poll::Ready(Some(SwarmEvent::NewListenAddr { address, .. })) => {
                    Poll::Ready(address)
                }
                Poll::Pending => Poll::Pending,
                _ => panic!("Was expecting the listen address to be reported"),
            }));

        swarm.dial(local_address.clone()).unwrap();

        let mut got_dial_err = false;
        let mut got_inc_err = false;
        futures::executor::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
            loop {
                match swarm.poll_next_unpin(cx) {
                    Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
                        peer_id,
                        error: DialError::WrongPeerId { .. },
                        ..
                    })) => {
                        assert_eq!(&peer_id.unwrap(), swarm.local_peer_id());
                        assert!(!got_dial_err);
                        got_dial_err = true;
                        if got_inc_err {
                            return Poll::Ready(Ok(()));
                        }
                    }
                    Poll::Ready(Some(SwarmEvent::IncomingConnectionError {
                        local_addr, ..
                    })) => {
                        assert!(!got_inc_err);
                        assert_eq!(local_addr, local_address);
                        got_inc_err = true;
                        if got_dial_err {
                            return Poll::Ready(Ok(()));
                        }
                    }
                    Poll::Ready(Some(SwarmEvent::IncomingConnection { local_addr, .. })) => {
                        assert_eq!(local_addr, local_address);
                    }
                    Poll::Ready(ev) => {
                        panic!("Unexpected event: {:?}", ev)
                    }
                    Poll::Pending => break Poll::Pending,
                }
            }
        }))
        .unwrap();
    }

    #[test]
    fn dial_self_by_id() {
        // Trying to dial self by passing the same `PeerId` shouldn't even be possible in the first
        // place.
        let swarm = new_test_swarm::<_, ()>(DummyConnectionHandler::default()).build();
        let peer_id = *swarm.local_peer_id();
        assert!(!swarm.is_connected(&peer_id));
    }

    #[test]
    fn multiple_addresses_err() {
        // Tries dialing multiple addresses, and makes sure there's one dialing error per address.

        let target = PeerId::random();

        let mut swarm = new_test_swarm::<_, ()>(DummyConnectionHandler::default()).build();

        let mut addresses = Vec::new();
        for _ in 0..3 {
            addresses.push(multiaddr![Ip4([0, 0, 0, 0]), Tcp(rand::random::<u16>())]);
        }
        for _ in 0..5 {
            addresses.push(multiaddr![Udp(rand::random::<u16>())]);
        }
        addresses.shuffle(&mut rand::thread_rng());

        swarm
            .dial(
                DialOpts::peer_id(target)
                    .addresses(addresses.clone())
                    .build(),
            )
            .unwrap();

        futures::executor::block_on(future::poll_fn(|cx| -> Poll<Result<(), io::Error>> {
            loop {
                match swarm.poll_next_unpin(cx) {
                    Poll::Ready(Some(SwarmEvent::OutgoingConnectionError {
                        peer_id,
                        // multiaddr,
                        error: DialError::Transport(errors),
                    })) => {
                        assert_eq!(peer_id.unwrap(), target);

                        let failed_addresses =
                            errors.into_iter().map(|(addr, _)| addr).collect::<Vec<_>>();
                        assert_eq!(
                            failed_addresses,
                            addresses
                                .clone()
                                .into_iter()
                                .map(|addr| addr.with(Protocol::P2p(target.into())))
                                .collect::<Vec<_>>()
                        );

                        return Poll::Ready(Ok(()));
                    }
                    Poll::Ready(_) => unreachable!(),
                    Poll::Pending => break Poll::Pending,
                }
            }
        }))
        .unwrap();
    }

    #[test]
    fn aborting_pending_connection_surfaces_error() {
        let _ = env_logger::try_init();

        let mut dialer = new_test_swarm::<_, ()>(DummyConnectionHandler::default()).build();
        let mut listener = new_test_swarm::<_, ()>(DummyConnectionHandler::default()).build();

        let listener_peer_id = *listener.local_peer_id();
        listener.listen_on(multiaddr![Memory(0u64)]).unwrap();
        let listener_address = match block_on(listener.next()).unwrap() {
            SwarmEvent::NewListenAddr { address, .. } => address,
            e => panic!("Unexpected network event: {:?}", e),
        };

        dialer
            .dial(
                DialOpts::peer_id(listener_peer_id)
                    .addresses(vec![listener_address])
                    .build(),
            )
            .unwrap();

        dialer
            .disconnect_peer_id(listener_peer_id)
            .expect_err("Expect peer to not yet be connected.");

        match block_on(dialer.next()).unwrap() {
            SwarmEvent::OutgoingConnectionError {
                error: DialError::Aborted,
                ..
            } => {}
            e => panic!("Unexpected swarm event {:?}.", e),
        }
    }
}
